package com.gbase8c.dmt.migration;

import com.gbase8c.dmt.config.DmtConfig;
import com.gbase8c.dmt.dao.RecordDao;
import com.gbase8c.dmt.dao.TaskDao;
import com.gbase8c.dmt.migration.job.AbstractJobThread;
import com.gbase8c.dmt.migration.job.DataXJobThread;
import com.gbase8c.dmt.migration.job.SqlJobThread;
import com.gbase8c.dmt.model.enums.MigrationObjectType;
import com.gbase8c.dmt.model.migration.config.Task;
import com.gbase8c.dmt.model.migration.dto.DboDto;
import com.gbase8c.dmt.model.migration.dto.MigrationObject;
import com.gbase8c.dmt.model.migration.dto.TableMapper;
import com.gbase8c.dmt.model.migration.record.ExecuteRecord;
import com.gbase8c.dmt.model.migration.record.JobThreadRecord;
import com.gbase8c.dmt.model.migration.record.MigrationRecordDto;
import com.gbase8c.dmt.model.enums.MigrationTaskStatus;
import com.gbase8c.dmt.model.enums.Status;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import javax.sql.DataSource;
import java.io.Closeable;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;

@Slf4j
@Getter
@Setter
public class MigrationThread extends Thread implements Closeable {

    private String taskId;
    private MigrationRecordDto recordDto;
    private DmtConfig dmtConfig;
    private List<ExecuteRecord> recordList = new CopyOnWriteArrayList<>();
    private List<TableMapper> tableMappers = new CopyOnWriteArrayList<>();
    private RecordDao recordDao;
    private TaskDao taskDao;
    private volatile MigrationTaskStatus migrationTaskStatus = MigrationTaskStatus.RUNNING;
    private ThreadPoolTaskExecutor threadPoolTaskExecutor = null;
    private DataSource tarDataSource;
    protected volatile boolean suspend = false;
    protected volatile boolean stop = false;
    private final BlockingQueue<Future<JobThreadRecord>> queue = new LinkedBlockingQueue<>();
    private CompletionService<JobThreadRecord> completionService = null;
    private Map<MigrationObjectType, List<? extends MigrationObject>> migrationObjectListMap;
    private Map<MigrationObjectType, List<AbstractJobThread>> jobListMap = new LinkedHashMap<>();
    private Map<MigrationObjectType, MoLogger> moLoggerMap = new HashMap<>();

    public static MigrationThread build(DboDto dboDto,
                                        Map<MigrationObjectType, List<? extends MigrationObject>> orderedDbObject,
                                        DataSource tarDataSource, RecordDao recordDao, TaskDao taskDao) {
        MigrationThread migrationThread = new MigrationThread();
        migrationThread.setTarDataSource(tarDataSource);
        migrationThread.setTaskId(dboDto.getTask().getId());
        migrationThread.setRecordDao(recordDao);
        migrationThread.setTaskDao(taskDao);
        migrationThread.setMigrationObjectListMap(orderedDbObject);
        return migrationThread;
    }

    public void init(DmtConfig dmtConfig) {
        this.migrationTaskStatus = MigrationTaskStatus.RUNNING;
        this.setDmtConfig(dmtConfig);
        initThreadPool();
        initMigrationRecord();
        initTaskList();
    }

    /**
     * 初始化线程池
     */
    private void initThreadPool() {
        //todo:初始化线程池,前期默认，后续换成配置中取
        threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        if (null != dmtConfig) {
            threadPoolTaskExecutor.setCorePoolSize(dmtConfig.getTaskPoolCoreSize());
            threadPoolTaskExecutor.setMaxPoolSize(dmtConfig.getTaskPoolMaxSize());
        } else {
            threadPoolTaskExecutor.setCorePoolSize(10);
            threadPoolTaskExecutor.setMaxPoolSize(15);
        }
        threadPoolTaskExecutor.setQueueCapacity(200);
        threadPoolTaskExecutor.setKeepAliveSeconds(60);
        threadPoolTaskExecutor.setThreadNamePrefix("migrationTask-");
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        threadPoolTaskExecutor.setAwaitTerminationSeconds(60);
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        threadPoolTaskExecutor.initialize();
        this.completionService = new ExecutorCompletionService<>(threadPoolTaskExecutor, queue);
        log.info("\n  maxPoolSize   : {} " +
                "\n  corePoolSize  : {} ", threadPoolTaskExecutor.getMaxPoolSize(), threadPoolTaskExecutor.getCorePoolSize());
    }

    private void initTaskList() {
        for (Map.Entry<MigrationObjectType, List<? extends MigrationObject>> entry : migrationObjectListMap.entrySet()) {
            List<AbstractJobThread> jobList = new ArrayList<>();
            MigrationObjectType type = entry.getKey();
            MoLogger moLogger = new MoLogger(dmtConfig.getWorkDir(), dmtConfig.getTaskId(), type.getName());
            moLoggerMap.put(type, moLogger);
            for (MigrationObject obj : entry.getValue()) {
                AbstractJobThread jobThread;
                if (type == MigrationObjectType.DATA) {
                    jobThread = new DataXJobThread(type, obj, tarDataSource, moLogger);
                } else {
                    jobThread = new SqlJobThread(type, obj, tarDataSource, moLogger);
                }
                jobList.add(jobThread);
            }
            this.jobListMap.put(type, jobList);
        }

    }

    private void initMigrationRecord() {
        this.recordDto = new MigrationRecordDto();
        this.recordDto.setTaskId(taskId);
        this.recordDto.setStartTime(new Date());
    }

    /**
     * 转换结果值
     */
    private Map<String, List<ExecuteRecord>> convertRecords(List<ExecuteRecord> recordList) {
        Map<String, List<ExecuteRecord>> schemaMap = Maps.newHashMap();
        if (!CollectionUtils.isEmpty(recordList)) {
            schemaMap = recordList.stream().collect(Collectors.groupingBy(ExecuteRecord::getSchemaName));
        }
        return schemaMap;
    }

    @Override
    public synchronized void start() {
        this.migrationTaskStatus = MigrationTaskStatus.RUNNING;
        Task task = taskDao.get(taskId);
        task.setStatus(MigrationTaskStatus.RUNNING);
        MigrationObjectService.updateTaskStatus(taskId, MigrationTaskStatus.RUNNING);
        taskDao.update(task);
        log.info("[MigrationThread] mission : " + taskId + "， started !!! ");
        jobListMap.forEach((type, jobList) -> {
            MoLogger moLogger = moLoggerMap.get(type);
            if (moLogger != null) {
                moLogger.start();
            }
            long begin = System.currentTimeMillis();
            //分批执行任务
            for (AbstractJobThread abstractJobThread : jobList) {
                completionService.submit(abstractJobThread);
            }
            //阻塞获取值
            futureTake(jobList.size(), type);
            long end = System.currentTimeMillis();
            log.info(
                    "\n********************************************************************************************************" +
                    "\n* type                 : {}" +
                    "\n* job size             : {} " +
                    "\n* start time           : {} " +
                    "\n* end time             : {} " +
                    "\n* spend total time     : {}ms " +
                    "\n********************************************************************************************************"
                    , type
                    , jobList.size()
                    , DateFormatUtils.format(begin, "yyyy-MM-dd HH:mm:ss.SSS")
                    , DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS")
                    , (end - begin));
            if (moLogger != null) {
                moLogger.stop();
            }
        });
        this.recordDto.setEndTime(new Date());
        this.recordDto.setStatus(1);
        int count = this.recordList.stream().mapToInt(ExecuteRecord::getFailed).sum();
        MigrationTaskStatus status = MigrationTaskStatus.SUCCESS;
        if (count > 0) {
            status = MigrationTaskStatus.FAILED;
        }
        this.recordDto.setMigrationTaskStatus(status);
        try {
            this.recordDto.setSchemaRecordsMap(convertRecords(this.recordList));
            this.recordDto.setTableMappers(this.tableMappers);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            status = MigrationTaskStatus.FAILED;
        }
        this.setMigrationTaskStatus(status);
        this.recordDao.save(this.recordDto);
        MigrationObjectService.updateTaskStatus(taskId, status);
        MigrationObjectService.removeTaskStatus(taskId);
        threadPoolTaskExecutor.shutdown();
    }

    public MigrationRecordDto getRunningRecord() {
        MigrationRecordDto runningRecord = this.recordDto;
        Map<String, List<ExecuteRecord>> convertRecords = convertRecords(this.recordList);
        runningRecord.setSchemaRecordsMap(convertRecords);
        return runningRecord;
    }

    /**
     * 获取任务执行的值
     */
    private void futureTake(int jobSize, MigrationObjectType type) {
        List<JobThreadRecord> jobThreadRecords = new ArrayList<>();
        for (int i = 0; i < jobSize; i++) {
            try {
                Future<JobThreadRecord> future = completionService.take();
                JobThreadRecord jobThreadRecord = future.get();
                jobThreadRecords.add(jobThreadRecord);
            } catch (InterruptedException | ExecutionException e) {
                log.error("futureTake exception", e);
            }
        }
        List<ExecuteRecord> executeRecords = new ArrayList<>();
        //获取迁移成功的映射表
        List<TableMapper> tableMappers = jobThreadRecords.stream()
                .map(JobThreadRecord::getDbObject)
                .filter(m -> MigrationObjectType.TABLE.equals(m.getMigrationObjectType()))
                .map(MigrationObject::getTableMapper)
                .collect(Collectors.toList());
        this.tableMappers.addAll(tableMappers);
        Map<String, List<JobThreadRecord>> jobListMap = jobThreadRecords
                .stream()
                .collect(Collectors.groupingBy(JobThreadRecord::getSchemaName));
        //任务结果转换
        jobListMap.forEach((key, value) -> {
            int success = value.stream().mapToInt(jobRecord -> jobRecord.getFlag() ? 1 : 0).sum();
            int failed = value.stream().mapToInt(jobRecord -> jobRecord.getFlag() ? 0 : 1).sum();
            List<MigrationObject> errorObjs = Lists.newArrayList();
            List<MigrationObject> successObjs = Lists.newArrayList();
            List<String> errorLogList = Lists.newArrayList();
            successObjs = value.stream()
                    .map(JobThreadRecord::getDbObject)
                    .filter(errorObj -> Status.Finished == errorObj.getStatus())
                    .collect(Collectors.toList());
            if (failed != 0) {
                errorObjs = value.stream()
                        .map(JobThreadRecord::getDbObject)
                        .filter(errorObj -> Status.Failure == errorObj.getStatus())
                        .collect(Collectors.toList());
                errorLogList = value.stream().map(JobThreadRecord::getErrorLog).filter(Objects::nonNull).collect(Collectors.toList());
            }
            ExecuteRecord record = ExecuteRecord.builder()
                    .schemaName(key)
                    .type(type)
                    .total(value.size())
                    .succeed(success)
                    .errorObjs(errorObjs)
                    .successObjs(successObjs)
                    .errorLogs(errorLogList)
                    .failed(failed)
                    .build();
            executeRecords.add(record);
        });
        this.recordList.addAll(executeRecords);
    }

    @Override
    public void close() {
        this.stop = true;
        this.getRecordDao().save(recordDto);
    }

}
